package com.ruyuan.eshop.push.mq.consumer.listener;

import com.alibaba.fastjson.JSON;
import com.ruyuan.eshop.common.concurrent.NamedDaemonThreadFactory;
import com.ruyuan.eshop.common.core.JsonResult;
import com.ruyuan.eshop.common.exception.BaseBizException;
import com.ruyuan.eshop.common.message.PlatformCouponMessage;
import com.ruyuan.eshop.common.message.PlatformHotProductMessage;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import java.util.stream.Collectors;

/**
 * @author zhonghuashishan
 */
@Slf4j
@Component
public class PlatFormHotProductListener implements MessageListenerConcurrently {

    /**
     * 定义一个业务线程池
     */
    private static final int PERMITS = 30;
    private static final AtomicBoolean initializedRef = new AtomicBoolean(false);
    private static ThreadPoolExecutor THREAD_POOL_EXECUTOR = null;
    private static final Supplier<ThreadPoolExecutor> THREAD_POOL_EXECUTOR_SUPPLIER = () -> {
        if (initializedRef.compareAndSet(false, true)) {
            THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(
                    PERMITS, PERMITS * 2,
                    60,
                    TimeUnit.SECONDS,
                    new ArrayBlockingQueue<>(1000),
                    NamedDaemonThreadFactory.getInstance("consumeCouponMsg"),
                    new ThreadPoolExecutor.CallerRunsPolicy());
        }
        return THREAD_POOL_EXECUTOR;
    };

    /**
     * 并发消费消息
     * @param msgList
     * @param context
     * @return
     */
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgList, ConsumeConcurrentlyContext context) {
        try {
            // 方式一：使用默认的commonPool来处理任务
            // supplyAsync(Supplier<U> supplier) API
            // 默认使用的是 ForkJoinPool.commonPool() 这个线程池
            // 该线程池在jvm内是唯一的，默认的线程数量是cpu的核数减1
            // 如果觉得不线程数不够用可以通过jvm系统参数 java.util.concurrent.ForkJoinPool.common.parallelism 的值调整commonPool的并行度，或者采用方式二
            /*List<CompletableFuture<HashMap<Long,Boolean>>> futureList = msgList.stream()
                    .map(e -> CompletableFuture.supplyAsync(() -> handleMessageExt(e)))
                    .collect(Collectors.toList());*/

            // 方式二：使用自定的业务线程池来处理任务
            List<CompletableFuture<HashMap<Long,Boolean>>> futureList = msgList.stream()
                    .map(e -> CompletableFuture.supplyAsync(() -> handleMessageExt(e), THREAD_POOL_EXECUTOR_SUPPLIER.get()))
                    .collect(Collectors.toList());

            List<HashMap<Long,Boolean>> resultList = futureList.stream()
                    .map(CompletableFuture::join)
                    .filter(Objects::nonNull)
                    .collect(Collectors.toList());

        }catch (Exception e){
            log.error("consume error,热门商品通知消息消费失败", e);
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }

        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }

    public HashMap<Long,Boolean> handleMessageExt(MessageExt messageExt) {
        log.info("执行热门商品通知消息消逻辑，消息内容：{}", messageExt.getBody());
        String msg = new String(messageExt.getBody());
        PlatformHotProductMessage platformHotProductMessage = JSON.parseObject(msg , PlatformHotProductMessage.class);
        log.info("热门商品通知消息，商品名称:{}", platformHotProductMessage.getGoodsName());

        return informByPush(platformHotProductMessage);
    }

    /**
     * 第三方平台推送消息到app
     *
     * @param message
     */
    private HashMap<Long,Boolean> informByPush(PlatformHotProductMessage message){
        StringBuffer buffer = new StringBuffer("速戳!精致小物件,");
        buffer.append(message.getGoodsName() + ", ");
        buffer.append(message.getGoodsDesc());
        List<String> keyWords = message.getKeyWords();
        for (String keyWord : keyWords){
            buffer.append(keyWord);
        }
        // 注意，这里一般都是调用第三方API，把id信息填充进去
        buffer.append(message.getAccountId());
        log.info("消息推送中：消息内容：{}", buffer.toString());

        HashMap<Long,Boolean> pushResultMap = new HashMap();
        pushResultMap.put(message.getAccountId(),true);

        try {
            Thread.sleep(200);
        } catch (InterruptedException e) {
           new BaseBizException("调用第三方API异常");
        }
        return pushResultMap;
    }
}
